aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c31
-rw-r--r--src/transport/ipc/ipc.c26
-rw-r--r--src/transport/tcp/tcp.c26
3 files changed, 58 insertions, 25 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 9a39208d..0ff87548 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -113,14 +113,14 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
static int
-nni_inproc_pipe_init(void **argp)
+nni_inproc_pipe_init(nni_inproc_pipe **pipep)
{
nni_inproc_pipe *pipe;
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
}
@@ -131,8 +131,6 @@ nni_inproc_pipe_fini(void *arg)
nni_inproc_pipe *pipe = arg;
nni_inproc_pair *pair;
- // We could assert the pipe closed...
-
if ((pair = pipe->pair) != NULL) {
// If we are the last peer, then toss the pair structure.
nni_mtx_lock(&pair->mx);
@@ -288,15 +286,21 @@ nni_inproc_ep_close(void *arg)
static int
-nni_inproc_ep_connect(void *arg, void *pipearg)
+nni_inproc_ep_connect(void *arg, void **pipep)
{
- nni_inproc_pipe *pipe = pipearg;
+ nni_inproc_pipe *pipe;
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
+ int rv;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
+
+ if ((rv = nni_inproc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
+
nni_mtx_lock(&nni_inproc.mx);
// Find a server.
@@ -310,6 +314,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
}
if (server == NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECONNREFUSED);
}
@@ -321,6 +326,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
if (ep->closed) {
nni_list_remove(&server->clients, ep);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECLOSED);
}
nni_cv_wake(&server->cv);
@@ -333,10 +339,11 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
// still be set, indicating server shutdown.
if (ep->cpipe != NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECONNRESET);
}
-
nni_mtx_unlock(&nni_inproc.mx);
+ *pipep = pipe;
return (0);
}
@@ -373,13 +380,13 @@ nni_inproc_ep_bind(void *arg)
static int
-nni_inproc_ep_accept(void *arg, void *pipearg)
+nni_inproc_ep_accept(void *arg, void **pipep)
{
nni_inproc_ep *ep = arg;
- nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *client;
nni_inproc_pair *pair;
int rv;
+ nni_inproc_pipe *pipe;
if (ep->mode != NNI_INPROC_EP_LISTEN) {
return (NNG_EINVAL);
@@ -394,7 +401,8 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
return (rv);
}
if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
+ ((rv = nni_msgq_init(&pair->q[1], 4)) != 0) ||
+ ((rv = nni_inproc_pipe_init(&pipe)) != 0)) {
nni_inproc_pair_destroy(pair);
return (rv);
}
@@ -406,6 +414,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
// time we acquired the lock.
nni_mtx_unlock(&nni_inproc.mx);
nni_inproc_pair_destroy(pair);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECLOSED);
}
if ((client = nni_list_first(&ep->clients)) != NULL) {
@@ -429,6 +438,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
client->cpipe = NULL;
nni_cv_wake(&client->cv);
+ *pipep = pipe;
nni_mtx_unlock(&nni_inproc.mx);
return (0);
@@ -436,7 +446,6 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
static nni_tran_pipe nni_inproc_pipe_ops = {
- .p_init = nni_inproc_pipe_init,
.p_fini = nni_inproc_pipe_fini,
.p_aio_send = nni_inproc_pipe_aio_send,
.p_aio_recv = nni_inproc_pipe_aio_recv,
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7727b375..8cb24b28 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -92,7 +92,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(void **argp)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep)
{
nni_ipc_pipe *pipe;
int rv;
@@ -115,7 +115,7 @@ nni_ipc_pipe_init(void **argp)
if (rv != 0) {
goto fail;
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
fail:
@@ -443,10 +443,10 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe)
static int
-nni_ipc_ep_connect(void *arg, void *pipearg)
+nni_ipc_ep_connect(void *arg, void **pipep)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe = pipearg;
+ nni_ipc_pipe *pipe;
int rv;
const char *path;
@@ -455,17 +455,24 @@ nni_ipc_ep_connect(void *arg, void *pipearg)
}
path = ep->addr + strlen("ipc://");
+ if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
+
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
rv = nni_plat_ipc_connect(pipe->isp, path);
if (rv != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
@@ -491,27 +498,32 @@ nni_ipc_ep_bind(void *arg)
static int
-nni_ipc_ep_accept(void *arg, void *pipearg)
+nni_ipc_ep_accept(void *arg, void **pipep)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe = pipearg;
+ nni_ipc_pipe *pipe;
int rv;
+ if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
static nni_tran_pipe nni_ipc_pipe_ops = {
- .p_init = nni_ipc_pipe_init,
.p_fini = nni_ipc_pipe_fini,
.p_aio_send = nni_ipc_pipe_aio_send,
.p_aio_recv = nni_ipc_pipe_aio_recv,
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4aecef65..d366a496 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -92,7 +92,7 @@ nni_tcp_pipe_fini(void *arg)
static int
-nni_tcp_pipe_init(void **argp)
+nni_tcp_pipe_init(nni_tcp_pipe **pipep)
{
nni_tcp_pipe *pipe;
int rv;
@@ -114,7 +114,7 @@ nni_tcp_pipe_init(void **argp)
if (rv != 0) {
goto fail;
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
fail:
@@ -484,10 +484,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
static int
-nni_tcp_ep_connect(void *arg, void *pipearg)
+nni_tcp_ep_connect(void *arg, void **pipep)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe = pipearg;
+ nni_tcp_pipe *pipe;
char *host;
uint16_t port;
int flag;
@@ -531,6 +531,9 @@ nni_tcp_ep_connect(void *arg, void *pipearg)
return (rv);
}
+ if ((rv = nni_tcp_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -540,12 +543,15 @@ nni_tcp_ep_connect(void *arg, void *pipearg)
bindaddr = lclpart == NULL ? NULL : &lcladdr;
rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr);
if (rv != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
@@ -582,27 +588,33 @@ nni_tcp_ep_bind(void *arg)
static int
-nni_tcp_ep_accept(void *arg, void *pipearg)
+nni_tcp_ep_accept(void *arg, void **pipep)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe = pipearg;
+ nni_tcp_pipe *pipe;
int rv;
+ if ((rv = nni_tcp_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
+
if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
static nni_tran_pipe nni_tcp_pipe_ops = {
- .p_init = nni_tcp_pipe_init,
.p_fini = nni_tcp_pipe_fini,
.p_aio_send = nni_tcp_pipe_aio_send,
.p_aio_recv = nni_tcp_pipe_aio_recv,