aboutsummaryrefslogtreecommitdiff
path: root/src/transport/inproc
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/inproc')
-rw-r--r--src/transport/inproc/inproc.c31
1 files changed, 20 insertions, 11 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 9a39208d..0ff87548 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -113,14 +113,14 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
static int
-nni_inproc_pipe_init(void **argp)
+nni_inproc_pipe_init(nni_inproc_pipe **pipep)
{
nni_inproc_pipe *pipe;
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
}
@@ -131,8 +131,6 @@ nni_inproc_pipe_fini(void *arg)
nni_inproc_pipe *pipe = arg;
nni_inproc_pair *pair;
- // We could assert the pipe closed...
-
if ((pair = pipe->pair) != NULL) {
// If we are the last peer, then toss the pair structure.
nni_mtx_lock(&pair->mx);
@@ -288,15 +286,21 @@ nni_inproc_ep_close(void *arg)
static int
-nni_inproc_ep_connect(void *arg, void *pipearg)
+nni_inproc_ep_connect(void *arg, void **pipep)
{
- nni_inproc_pipe *pipe = pipearg;
+ nni_inproc_pipe *pipe;
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
+ int rv;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
+
+ if ((rv = nni_inproc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
+
nni_mtx_lock(&nni_inproc.mx);
// Find a server.
@@ -310,6 +314,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
}
if (server == NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECONNREFUSED);
}
@@ -321,6 +326,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
if (ep->closed) {
nni_list_remove(&server->clients, ep);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECLOSED);
}
nni_cv_wake(&server->cv);
@@ -333,10 +339,11 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
// still be set, indicating server shutdown.
if (ep->cpipe != NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECONNRESET);
}
-
nni_mtx_unlock(&nni_inproc.mx);
+ *pipep = pipe;
return (0);
}
@@ -373,13 +380,13 @@ nni_inproc_ep_bind(void *arg)
static int
-nni_inproc_ep_accept(void *arg, void *pipearg)
+nni_inproc_ep_accept(void *arg, void **pipep)
{
nni_inproc_ep *ep = arg;
- nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *client;
nni_inproc_pair *pair;
int rv;
+ nni_inproc_pipe *pipe;
if (ep->mode != NNI_INPROC_EP_LISTEN) {
return (NNG_EINVAL);
@@ -394,7 +401,8 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
return (rv);
}
if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
+ ((rv = nni_msgq_init(&pair->q[1], 4)) != 0) ||
+ ((rv = nni_inproc_pipe_init(&pipe)) != 0)) {
nni_inproc_pair_destroy(pair);
return (rv);
}
@@ -406,6 +414,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
// time we acquired the lock.
nni_mtx_unlock(&nni_inproc.mx);
nni_inproc_pair_destroy(pair);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECLOSED);
}
if ((client = nni_list_first(&ep->clients)) != NULL) {
@@ -429,6 +438,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
client->cpipe = NULL;
nni_cv_wake(&client->cv);
+ *pipep = pipe;
nni_mtx_unlock(&nni_inproc.mx);
return (0);
@@ -436,7 +446,6 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
static nni_tran_pipe nni_inproc_pipe_ops = {
- .p_init = nni_inproc_pipe_init,
.p_fini = nni_inproc_pipe_fini,
.p_aio_send = nni_inproc_pipe_aio_send,
.p_aio_recv = nni_inproc_pipe_aio_recv,