aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-02 12:58:53 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-02 12:58:53 -0700
commit424c2238c97d26d8d5fb30fb1449a96396269da0 (patch)
treea5d3ed01e348f4644b06d630a3e92997944152f6
parent53a22a96d49e8b44c7e70b59559db87f57158a82 (diff)
downloadnng-424c2238c97d26d8d5fb30fb1449a96396269da0.tar.gz
nng-424c2238c97d26d8d5fb30fb1449a96396269da0.tar.bz2
nng-424c2238c97d26d8d5fb30fb1449a96396269da0.zip
Transports allocate their pipe structures during connect & accept.
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/pipe.c2
-rw-r--r--src/core/transport.h15
-rw-r--r--src/transport/inproc/inproc.c31
-rw-r--r--src/transport/ipc/ipc.c26
-rw-r--r--src/transport/tcp/tcp.c26
6 files changed, 68 insertions, 36 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index e3f78ecd..223d86a3 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -245,7 +245,7 @@ nni_ep_connect(nni_ep *ep)
if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) {
return (rv);
}
- rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data);
+ rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_tran_data);
if (rv != 0) {
nni_pipe_remove(pipe);
return (rv);
@@ -411,7 +411,7 @@ nni_ep_accept(nni_ep *ep)
if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) {
return (rv);
}
- rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data);
+ rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_tran_data);
if (rv != 0) {
nni_pipe_remove(pipe);
return (rv);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 791ba4aa..ebf7adef 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -192,11 +192,13 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// Save the protocol destructor.
p->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
+#if 0
// Initialize the transport pipe data.
if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
+#endif
// Initialize protocol pipe data.
rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data);
diff --git a/src/core/transport.h b/src/core/transport.h
index d965201a..0ec7310a 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -49,9 +49,9 @@ struct nni_tran_ep {
// ep_connect establishes a connection. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
// NNG_ETIMEDOUT, and NNG_EPROTO. The first argument is the
- // transport specific endpoint, and the second is the transport
- // specific pipe structure.
- int (*ep_connect)(void *, void *);
+ // transport specific endpoint, and the second is a pointer to
+ // receive a newly created transport-specific pipe structure.
+ int (*ep_connect)(void *, void **);
// ep_bind just does the bind() and listen() work,
// reserving the address but not creating any connections.
@@ -61,9 +61,9 @@ struct nni_tran_ep {
int (*ep_bind)(void *);
// ep_accept accepts an inbound connection. The first argument
- // is the transport-specific endpoint, and the second is the
- // transport-specific pipe (which will have already been created.)
- int (*ep_accept)(void *, void *);
+ // is the transport-specific endpoint, and the second is a pointer to
+ // a transport-specific pipe, created by this function.
+ int (*ep_accept)(void *, void **);
// ep_close stops the endpoint from operating altogether. It does
// not affect pipes that have already been created.
@@ -81,9 +81,6 @@ struct nni_tran_ep {
// back into the socket at this point. (Which is one reason pointers back
// to socket or even enclosing pipe state, are not provided.)
struct nni_tran_pipe {
- // p_init initializes the pipe structure, allocating the structure.
- int (*p_init)(void **);
-
// p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used by
// the pipe. After this call returns, the system will not make
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 9a39208d..0ff87548 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -113,14 +113,14 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
static int
-nni_inproc_pipe_init(void **argp)
+nni_inproc_pipe_init(nni_inproc_pipe **pipep)
{
nni_inproc_pipe *pipe;
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
}
@@ -131,8 +131,6 @@ nni_inproc_pipe_fini(void *arg)
nni_inproc_pipe *pipe = arg;
nni_inproc_pair *pair;
- // We could assert the pipe closed...
-
if ((pair = pipe->pair) != NULL) {
// If we are the last peer, then toss the pair structure.
nni_mtx_lock(&pair->mx);
@@ -288,15 +286,21 @@ nni_inproc_ep_close(void *arg)
static int
-nni_inproc_ep_connect(void *arg, void *pipearg)
+nni_inproc_ep_connect(void *arg, void **pipep)
{
- nni_inproc_pipe *pipe = pipearg;
+ nni_inproc_pipe *pipe;
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
+ int rv;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
+
+ if ((rv = nni_inproc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
+
nni_mtx_lock(&nni_inproc.mx);
// Find a server.
@@ -310,6 +314,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
}
if (server == NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECONNREFUSED);
}
@@ -321,6 +326,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
if (ep->closed) {
nni_list_remove(&server->clients, ep);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECLOSED);
}
nni_cv_wake(&server->cv);
@@ -333,10 +339,11 @@ nni_inproc_ep_connect(void *arg, void *pipearg)
// still be set, indicating server shutdown.
if (ep->cpipe != NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECONNRESET);
}
-
nni_mtx_unlock(&nni_inproc.mx);
+ *pipep = pipe;
return (0);
}
@@ -373,13 +380,13 @@ nni_inproc_ep_bind(void *arg)
static int
-nni_inproc_ep_accept(void *arg, void *pipearg)
+nni_inproc_ep_accept(void *arg, void **pipep)
{
nni_inproc_ep *ep = arg;
- nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *client;
nni_inproc_pair *pair;
int rv;
+ nni_inproc_pipe *pipe;
if (ep->mode != NNI_INPROC_EP_LISTEN) {
return (NNG_EINVAL);
@@ -394,7 +401,8 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
return (rv);
}
if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
+ ((rv = nni_msgq_init(&pair->q[1], 4)) != 0) ||
+ ((rv = nni_inproc_pipe_init(&pipe)) != 0)) {
nni_inproc_pair_destroy(pair);
return (rv);
}
@@ -406,6 +414,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
// time we acquired the lock.
nni_mtx_unlock(&nni_inproc.mx);
nni_inproc_pair_destroy(pair);
+ nni_inproc_pipe_fini(pipe);
return (NNG_ECLOSED);
}
if ((client = nni_list_first(&ep->clients)) != NULL) {
@@ -429,6 +438,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
client->cpipe = NULL;
nni_cv_wake(&client->cv);
+ *pipep = pipe;
nni_mtx_unlock(&nni_inproc.mx);
return (0);
@@ -436,7 +446,6 @@ nni_inproc_ep_accept(void *arg, void *pipearg)
static nni_tran_pipe nni_inproc_pipe_ops = {
- .p_init = nni_inproc_pipe_init,
.p_fini = nni_inproc_pipe_fini,
.p_aio_send = nni_inproc_pipe_aio_send,
.p_aio_recv = nni_inproc_pipe_aio_recv,
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7727b375..8cb24b28 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -92,7 +92,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(void **argp)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep)
{
nni_ipc_pipe *pipe;
int rv;
@@ -115,7 +115,7 @@ nni_ipc_pipe_init(void **argp)
if (rv != 0) {
goto fail;
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
fail:
@@ -443,10 +443,10 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe)
static int
-nni_ipc_ep_connect(void *arg, void *pipearg)
+nni_ipc_ep_connect(void *arg, void **pipep)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe = pipearg;
+ nni_ipc_pipe *pipe;
int rv;
const char *path;
@@ -455,17 +455,24 @@ nni_ipc_ep_connect(void *arg, void *pipearg)
}
path = ep->addr + strlen("ipc://");
+ if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
+
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
rv = nni_plat_ipc_connect(pipe->isp, path);
if (rv != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
@@ -491,27 +498,32 @@ nni_ipc_ep_bind(void *arg)
static int
-nni_ipc_ep_accept(void *arg, void *pipearg)
+nni_ipc_ep_accept(void *arg, void **pipep)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe = pipearg;
+ nni_ipc_pipe *pipe;
int rv;
+ if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
+ nni_ipc_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
static nni_tran_pipe nni_ipc_pipe_ops = {
- .p_init = nni_ipc_pipe_init,
.p_fini = nni_ipc_pipe_fini,
.p_aio_send = nni_ipc_pipe_aio_send,
.p_aio_recv = nni_ipc_pipe_aio_recv,
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4aecef65..d366a496 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -92,7 +92,7 @@ nni_tcp_pipe_fini(void *arg)
static int
-nni_tcp_pipe_init(void **argp)
+nni_tcp_pipe_init(nni_tcp_pipe **pipep)
{
nni_tcp_pipe *pipe;
int rv;
@@ -114,7 +114,7 @@ nni_tcp_pipe_init(void **argp)
if (rv != 0) {
goto fail;
}
- *argp = pipe;
+ *pipep = pipe;
return (0);
fail:
@@ -484,10 +484,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
static int
-nni_tcp_ep_connect(void *arg, void *pipearg)
+nni_tcp_ep_connect(void *arg, void **pipep)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe = pipearg;
+ nni_tcp_pipe *pipe;
char *host;
uint16_t port;
int flag;
@@ -531,6 +531,9 @@ nni_tcp_ep_connect(void *arg, void *pipearg)
return (rv);
}
+ if ((rv = nni_tcp_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -540,12 +543,15 @@ nni_tcp_ep_connect(void *arg, void *pipearg)
bindaddr = lclpart == NULL ? NULL : &lcladdr;
rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr);
if (rv != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
@@ -582,27 +588,33 @@ nni_tcp_ep_bind(void *arg)
static int
-nni_tcp_ep_accept(void *arg, void *pipearg)
+nni_tcp_ep_accept(void *arg, void **pipep)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe = pipearg;
+ nni_tcp_pipe *pipe;
int rv;
+ if ((rv = nni_tcp_pipe_init(&pipe)) != 0) {
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
+
if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
+ nni_tcp_pipe_fini(pipe);
return (rv);
}
+ *pipep = pipe;
return (0);
}
static nni_tran_pipe nni_tcp_pipe_ops = {
- .p_init = nni_tcp_pipe_init,
.p_fini = nni_tcp_pipe_fini,
.p_aio_send = nni_tcp_pipe_aio_send,
.p_aio_recv = nni_tcp_pipe_aio_recv,