diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-05 15:51:29 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-05 15:51:29 -0700 |
| commit | 372d03889b1398016056d1641d31705b75d97c9a (patch) | |
| tree | e26d06ee7e07820e8da08e9259832b2ce4180934 /src | |
| parent | 0b08b59786e4af19f57bf00ebc20ee8f96453679 (diff) | |
| download | nng-372d03889b1398016056d1641d31705b75d97c9a.tar.gz nng-372d03889b1398016056d1641d31705b75d97c9a.tar.bz2 nng-372d03889b1398016056d1641d31705b75d97c9a.zip | |
Move IPC negotiation out of connect/accept.
This prevents a slow partner from blocking new connections from being
established on the server. Before this a single partner could cause
the server to block waiting to complete the negotiation.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/pipe.c | 31 | ||||
| -rw-r--r-- | src/core/pipe.h | 1 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 86 |
3 files changed, 65 insertions, 53 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 08d3dee7..eaf4fa1c 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -53,6 +53,7 @@ nni_pipe_dtor(void *ptr) p->p_tran_ops.p_fini(p->p_tran_data); } + nni_aio_fini(&p->p_start_aio); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); } @@ -170,6 +171,28 @@ nni_pipe_peer(nni_pipe *p) } +static void +nni_pipe_start_cb(void *arg) +{ + nni_pipe *p = arg; + nni_aio *aio = &p->p_start_aio; + int rv; + + nni_mtx_lock(&p->p_mtx); + if ((rv = nni_aio_result(aio)) != 0) { + nni_mtx_unlock(&p->p_mtx); + nni_pipe_stop(p); + return; + } + + nni_mtx_unlock(&p->p_mtx); + + if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { + nni_pipe_stop(p); + } +} + + int nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) { @@ -182,6 +205,10 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) if (rv != 0) { return (rv); } + if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) { + nni_objhash_unref(nni_pipes, p->p_id); + return (rv); + } p->p_sock = sock; p->p_ep = ep; @@ -225,10 +252,12 @@ nni_pipe_start(nni_pipe *p) { int rv; - if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { + if (p->p_tran_ops.p_start == NULL) { + rv = nni_sock_pipe_ready(p->p_sock, p); return (rv); } + p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio); // XXX: Publish event return (0); diff --git a/src/core/pipe.h b/src/core/pipe.h index e8e6ecad..f7df8232 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -32,6 +32,7 @@ struct nni_pipe { int p_stop; nni_mtx p_mtx; nni_taskq_ent p_reap_tqe; + nni_aio p_start_aio; }; extern int nni_pipe_sys_init(void); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 7a486b4d..6ad9da75 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -140,7 +140,7 @@ fail: static void -nni_ipc_cancel_nego(nni_aio *aio) +nni_ipc_cancel_start(nni_aio *aio) { nni_ipc_pipe *pipe = aio->a_prov_data; @@ -411,6 +411,38 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) } +static void +nni_ipc_pipe_start(void *arg, nni_aio *aio) +{ + nni_ipc_pipe *pipe = arg; + int rv; + + nni_mtx_lock(&pipe->mtx); + pipe->txhead[0] = 0; + pipe->txhead[1] = 'S'; + pipe->txhead[2] = 'P'; + pipe->txhead[3] = 0; + NNI_PUT16(&pipe->txhead[4], pipe->proto); + NNI_PUT16(&pipe->txhead[6], 0); + + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; + pipe->negaio.a_iov[0].iov_len = 8; + pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; + rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); + if (rv != 0) { + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_plat_ipc_send(pipe->isp, &pipe->negaio); + nni_mtx_unlock(&pipe->mtx); +} + + static uint16_t nni_ipc_pipe_peer(void *arg) { @@ -491,49 +523,6 @@ nni_ipc_ep_close(void *arg) static int -nni_ipc_negotiate(nni_ipc_pipe *pipe) -{ - int rv; - nni_iov iov; - uint8_t buf[8]; - nni_aio aio; - - pipe->txhead[0] = 0; - pipe->txhead[1] = 'S'; - pipe->txhead[2] = 'P'; - pipe->txhead[3] = 0; - NNI_PUT16(&pipe->txhead[4], pipe->proto); - NNI_PUT16(&pipe->txhead[6], 0); - - nni_aio_init(&aio, NULL, NULL); - - nni_mtx_lock(&pipe->mtx); - pipe->user_negaio = &aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; - pipe->negaio.a_iov[0].iov_len = 8; - pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; - rv = nni_aio_start(&aio, nni_ipc_cancel_nego, pipe); - if (rv != 0) { - nni_mtx_unlock(&pipe->mtx); - return (NNG_ECLOSED); - } - nni_plat_ipc_send(pipe->isp, &pipe->negaio); - nni_mtx_unlock(&pipe->mtx); - - nni_aio_wait(&aio); - rv = nni_aio_result(&aio); - nni_aio_fini(&aio); - NNI_ASSERT(pipe->user_negaio == NULL); - - return (rv); -} - - -static int nni_ipc_ep_connect_sync(void *arg, void **pipep) { nni_ipc_ep *ep = arg; @@ -557,10 +546,6 @@ nni_ipc_ep_connect_sync(void *arg, void **pipep) return (rv); } - if ((rv = nni_ipc_negotiate(pipe)) != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); - } *pipep = pipe; return (0); } @@ -600,10 +585,6 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep) nni_ipc_pipe_fini(pipe); return (rv); } - if ((rv = nni_ipc_negotiate(pipe)) != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); - } *pipep = pipe; return (0); } @@ -611,6 +592,7 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep) static nni_tran_pipe nni_ipc_pipe_ops = { .p_fini = nni_ipc_pipe_fini, + .p_start = nni_ipc_pipe_start, .p_send = nni_ipc_pipe_send, .p_recv = nni_ipc_pipe_recv, .p_close = nni_ipc_pipe_close, |
