aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-05 15:51:29 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-05 15:51:29 -0700
commit372d03889b1398016056d1641d31705b75d97c9a (patch)
treee26d06ee7e07820e8da08e9259832b2ce4180934 /src
parent0b08b59786e4af19f57bf00ebc20ee8f96453679 (diff)
downloadnng-372d03889b1398016056d1641d31705b75d97c9a.tar.gz
nng-372d03889b1398016056d1641d31705b75d97c9a.tar.bz2
nng-372d03889b1398016056d1641d31705b75d97c9a.zip
Move IPC negotiation out of connect/accept.
This prevents a slow partner from blocking new connections from being established on the server. Before this a single partner could cause the server to block waiting to complete the negotiation.
Diffstat (limited to 'src')
-rw-r--r--src/core/pipe.c31
-rw-r--r--src/core/pipe.h1
-rw-r--r--src/transport/ipc/ipc.c86
3 files changed, 65 insertions, 53 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 08d3dee7..eaf4fa1c 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -53,6 +53,7 @@ nni_pipe_dtor(void *ptr)
p->p_tran_ops.p_fini(p->p_tran_data);
}
+ nni_aio_fini(&p->p_start_aio);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
}
@@ -170,6 +171,28 @@ nni_pipe_peer(nni_pipe *p)
}
+static void
+nni_pipe_start_cb(void *arg)
+{
+ nni_pipe *p = arg;
+ nni_aio *aio = &p->p_start_aio;
+ int rv;
+
+ nni_mtx_lock(&p->p_mtx);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_mtx_unlock(&p->p_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+
+ nni_mtx_unlock(&p->p_mtx);
+
+ if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
+ nni_pipe_stop(p);
+ }
+}
+
+
int
nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
{
@@ -182,6 +205,10 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
if (rv != 0) {
return (rv);
}
+ if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
+ return (rv);
+ }
p->p_sock = sock;
p->p_ep = ep;
@@ -225,10 +252,12 @@ nni_pipe_start(nni_pipe *p)
{
int rv;
- if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
+ if (p->p_tran_ops.p_start == NULL) {
+ rv = nni_sock_pipe_ready(p->p_sock, p);
return (rv);
}
+ p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio);
// XXX: Publish event
return (0);
diff --git a/src/core/pipe.h b/src/core/pipe.h
index e8e6ecad..f7df8232 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -32,6 +32,7 @@ struct nni_pipe {
int p_stop;
nni_mtx p_mtx;
nni_taskq_ent p_reap_tqe;
+ nni_aio p_start_aio;
};
extern int nni_pipe_sys_init(void);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7a486b4d..6ad9da75 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -140,7 +140,7 @@ fail:
static void
-nni_ipc_cancel_nego(nni_aio *aio)
+nni_ipc_cancel_start(nni_aio *aio)
{
nni_ipc_pipe *pipe = aio->a_prov_data;
@@ -411,6 +411,38 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
}
+static void
+nni_ipc_pipe_start(void *arg, nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = arg;
+ int rv;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->txhead[0] = 0;
+ pipe->txhead[1] = 'S';
+ pipe->txhead[2] = 'P';
+ pipe->txhead[3] = 0;
+ NNI_PUT16(&pipe->txhead[4], pipe->proto);
+ NNI_PUT16(&pipe->txhead[6], 0);
+
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ pipe->negaio.a_niov = 1;
+ pipe->negaio.a_iov[0].iov_len = 8;
+ pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
+ rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ nni_plat_ipc_send(pipe->isp, &pipe->negaio);
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
static uint16_t
nni_ipc_pipe_peer(void *arg)
{
@@ -491,49 +523,6 @@ nni_ipc_ep_close(void *arg)
static int
-nni_ipc_negotiate(nni_ipc_pipe *pipe)
-{
- int rv;
- nni_iov iov;
- uint8_t buf[8];
- nni_aio aio;
-
- pipe->txhead[0] = 0;
- pipe->txhead[1] = 'S';
- pipe->txhead[2] = 'P';
- pipe->txhead[3] = 0;
- NNI_PUT16(&pipe->txhead[4], pipe->proto);
- NNI_PUT16(&pipe->txhead[6], 0);
-
- nni_aio_init(&aio, NULL, NULL);
-
- nni_mtx_lock(&pipe->mtx);
- pipe->user_negaio = &aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- pipe->negaio.a_niov = 1;
- pipe->negaio.a_iov[0].iov_len = 8;
- pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
- rv = nni_aio_start(&aio, nni_ipc_cancel_nego, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return (NNG_ECLOSED);
- }
- nni_plat_ipc_send(pipe->isp, &pipe->negaio);
- nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_wait(&aio);
- rv = nni_aio_result(&aio);
- nni_aio_fini(&aio);
- NNI_ASSERT(pipe->user_negaio == NULL);
-
- return (rv);
-}
-
-
-static int
nni_ipc_ep_connect_sync(void *arg, void **pipep)
{
nni_ipc_ep *ep = arg;
@@ -557,10 +546,6 @@ nni_ipc_ep_connect_sync(void *arg, void **pipep)
return (rv);
}
- if ((rv = nni_ipc_negotiate(pipe)) != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
- }
*pipep = pipe;
return (0);
}
@@ -600,10 +585,6 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep)
nni_ipc_pipe_fini(pipe);
return (rv);
}
- if ((rv = nni_ipc_negotiate(pipe)) != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
- }
*pipep = pipe;
return (0);
}
@@ -611,6 +592,7 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep)
static nni_tran_pipe nni_ipc_pipe_ops = {
.p_fini = nni_ipc_pipe_fini,
+ .p_start = nni_ipc_pipe_start,
.p_send = nni_ipc_pipe_send,
.p_recv = nni_ipc_pipe_recv,
.p_close = nni_ipc_pipe_close,