diff options
| -rw-r--r-- | src/core/endpt.c | 4 | ||||
| -rw-r--r-- | src/core/pipe.c | 2 | ||||
| -rw-r--r-- | src/core/transport.h | 15 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 31 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 26 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 26 |
6 files changed, 68 insertions, 36 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index e3f78ecd..223d86a3 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -245,7 +245,7 @@ nni_ep_connect(nni_ep *ep) if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { return (rv); } - rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data); + rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_tran_data); if (rv != 0) { nni_pipe_remove(pipe); return (rv); @@ -411,7 +411,7 @@ nni_ep_accept(nni_ep *ep) if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { return (rv); } - rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data); + rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_tran_data); if (rv != 0) { nni_pipe_remove(pipe); return (rv); diff --git a/src/core/pipe.c b/src/core/pipe.c index 791ba4aa..ebf7adef 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -192,11 +192,13 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) // Save the protocol destructor. p->p_proto_dtor = sock->s_pipe_ops.pipe_fini; +#if 0 // Initialize the transport pipe data. if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) { nni_objhash_unref(nni_pipes, p->p_id); return (rv); } +#endif // Initialize protocol pipe data. rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data); diff --git a/src/core/transport.h b/src/core/transport.h index d965201a..0ec7310a 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -49,9 +49,9 @@ struct nni_tran_ep { // ep_connect establishes a connection. It can return errors // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, // NNG_ETIMEDOUT, and NNG_EPROTO. The first argument is the - // transport specific endpoint, and the second is the transport - // specific pipe structure. - int (*ep_connect)(void *, void *); + // transport specific endpoint, and the second is a pointer to + // receive a newly created transport-specific pipe structure. + int (*ep_connect)(void *, void **); // ep_bind just does the bind() and listen() work, // reserving the address but not creating any connections. @@ -61,9 +61,9 @@ struct nni_tran_ep { int (*ep_bind)(void *); // ep_accept accepts an inbound connection. The first argument - // is the transport-specific endpoint, and the second is the - // transport-specific pipe (which will have already been created.) - int (*ep_accept)(void *, void *); + // is the transport-specific endpoint, and the second is a pointer to + // a transport-specific pipe, created by this function. + int (*ep_accept)(void *, void **); // ep_close stops the endpoint from operating altogether. It does // not affect pipes that have already been created. @@ -81,9 +81,6 @@ struct nni_tran_ep { // back into the socket at this point. (Which is one reason pointers back // to socket or even enclosing pipe state, are not provided.) struct nni_tran_pipe { - // p_init initializes the pipe structure, allocating the structure. - int (*p_init)(void **); - // p_fini destroys the pipe. This should clean up all local // resources, including closing files and freeing memory, used by // the pipe. After this call returns, the system will not make diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 9a39208d..0ff87548 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -113,14 +113,14 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) static int -nni_inproc_pipe_init(void **argp) +nni_inproc_pipe_init(nni_inproc_pipe **pipep) { nni_inproc_pipe *pipe; if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } - *argp = pipe; + *pipep = pipe; return (0); } @@ -131,8 +131,6 @@ nni_inproc_pipe_fini(void *arg) nni_inproc_pipe *pipe = arg; nni_inproc_pair *pair; - // We could assert the pipe closed... - if ((pair = pipe->pair) != NULL) { // If we are the last peer, then toss the pair structure. nni_mtx_lock(&pair->mx); @@ -288,15 +286,21 @@ nni_inproc_ep_close(void *arg) static int -nni_inproc_ep_connect(void *arg, void *pipearg) +nni_inproc_ep_connect(void *arg, void **pipep) { - nni_inproc_pipe *pipe = pipearg; + nni_inproc_pipe *pipe; nni_inproc_ep *ep = arg; nni_inproc_ep *server; + int rv; if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } + + if ((rv = nni_inproc_pipe_init(&pipe)) != 0) { + return (rv); + } + nni_mtx_lock(&nni_inproc.mx); // Find a server. @@ -310,6 +314,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg) } if (server == NULL) { nni_mtx_unlock(&nni_inproc.mx); + nni_inproc_pipe_fini(pipe); return (NNG_ECONNREFUSED); } @@ -321,6 +326,7 @@ nni_inproc_ep_connect(void *arg, void *pipearg) if (ep->closed) { nni_list_remove(&server->clients, ep); nni_mtx_unlock(&nni_inproc.mx); + nni_inproc_pipe_fini(pipe); return (NNG_ECLOSED); } nni_cv_wake(&server->cv); @@ -333,10 +339,11 @@ nni_inproc_ep_connect(void *arg, void *pipearg) // still be set, indicating server shutdown. if (ep->cpipe != NULL) { nni_mtx_unlock(&nni_inproc.mx); + nni_inproc_pipe_fini(pipe); return (NNG_ECONNRESET); } - nni_mtx_unlock(&nni_inproc.mx); + *pipep = pipe; return (0); } @@ -373,13 +380,13 @@ nni_inproc_ep_bind(void *arg) static int -nni_inproc_ep_accept(void *arg, void *pipearg) +nni_inproc_ep_accept(void *arg, void **pipep) { nni_inproc_ep *ep = arg; - nni_inproc_pipe *pipe = pipearg; nni_inproc_ep *client; nni_inproc_pair *pair; int rv; + nni_inproc_pipe *pipe; if (ep->mode != NNI_INPROC_EP_LISTEN) { return (NNG_EINVAL); @@ -394,7 +401,8 @@ nni_inproc_ep_accept(void *arg, void *pipearg) return (rv); } if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || - ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + ((rv = nni_msgq_init(&pair->q[1], 4)) != 0) || + ((rv = nni_inproc_pipe_init(&pipe)) != 0)) { nni_inproc_pair_destroy(pair); return (rv); } @@ -406,6 +414,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg) // time we acquired the lock. nni_mtx_unlock(&nni_inproc.mx); nni_inproc_pair_destroy(pair); + nni_inproc_pipe_fini(pipe); return (NNG_ECLOSED); } if ((client = nni_list_first(&ep->clients)) != NULL) { @@ -429,6 +438,7 @@ nni_inproc_ep_accept(void *arg, void *pipearg) client->cpipe = NULL; nni_cv_wake(&client->cv); + *pipep = pipe; nni_mtx_unlock(&nni_inproc.mx); return (0); @@ -436,7 +446,6 @@ nni_inproc_ep_accept(void *arg, void *pipearg) static nni_tran_pipe nni_inproc_pipe_ops = { - .p_init = nni_inproc_pipe_init, .p_fini = nni_inproc_pipe_fini, .p_aio_send = nni_inproc_pipe_aio_send, .p_aio_recv = nni_inproc_pipe_aio_recv, diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 7727b375..8cb24b28 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -92,7 +92,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(void **argp) +nni_ipc_pipe_init(nni_ipc_pipe **pipep) { nni_ipc_pipe *pipe; int rv; @@ -115,7 +115,7 @@ nni_ipc_pipe_init(void **argp) if (rv != 0) { goto fail; } - *argp = pipe; + *pipep = pipe; return (0); fail: @@ -443,10 +443,10 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe) static int -nni_ipc_ep_connect(void *arg, void *pipearg) +nni_ipc_ep_connect(void *arg, void **pipep) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe = pipearg; + nni_ipc_pipe *pipe; int rv; const char *path; @@ -455,17 +455,24 @@ nni_ipc_ep_connect(void *arg, void *pipearg) } path = ep->addr + strlen("ipc://"); + if ((rv = nni_ipc_pipe_init(&pipe)) != 0) { + return (rv); + } + pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; rv = nni_plat_ipc_connect(pipe->isp, path); if (rv != 0) { + nni_ipc_pipe_fini(pipe); return (rv); } if ((rv = nni_ipc_negotiate(pipe)) != 0) { + nni_ipc_pipe_fini(pipe); return (rv); } + *pipep = pipe; return (0); } @@ -491,27 +498,32 @@ nni_ipc_ep_bind(void *arg) static int -nni_ipc_ep_accept(void *arg, void *pipearg) +nni_ipc_ep_accept(void *arg, void **pipep) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe = pipearg; + nni_ipc_pipe *pipe; int rv; + if ((rv = nni_ipc_pipe_init(&pipe)) != 0) { + return (rv); + } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { + nni_ipc_pipe_fini(pipe); return (rv); } if ((rv = nni_ipc_negotiate(pipe)) != 0) { + nni_ipc_pipe_fini(pipe); return (rv); } + *pipep = pipe; return (0); } static nni_tran_pipe nni_ipc_pipe_ops = { - .p_init = nni_ipc_pipe_init, .p_fini = nni_ipc_pipe_fini, .p_aio_send = nni_ipc_pipe_aio_send, .p_aio_recv = nni_ipc_pipe_aio_recv, diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 4aecef65..d366a496 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -92,7 +92,7 @@ nni_tcp_pipe_fini(void *arg) static int -nni_tcp_pipe_init(void **argp) +nni_tcp_pipe_init(nni_tcp_pipe **pipep) { nni_tcp_pipe *pipe; int rv; @@ -114,7 +114,7 @@ nni_tcp_pipe_init(void **argp) if (rv != 0) { goto fail; } - *argp = pipe; + *pipep = pipe; return (0); fail: @@ -484,10 +484,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe) static int -nni_tcp_ep_connect(void *arg, void *pipearg) +nni_tcp_ep_connect(void *arg, void **pipep) { nni_tcp_ep *ep = arg; - nni_tcp_pipe *pipe = pipearg; + nni_tcp_pipe *pipe; char *host; uint16_t port; int flag; @@ -531,6 +531,9 @@ nni_tcp_ep_connect(void *arg, void *pipearg) return (rv); } + if ((rv = nni_tcp_pipe_init(&pipe)) != 0) { + return (rv); + } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; @@ -540,12 +543,15 @@ nni_tcp_ep_connect(void *arg, void *pipearg) bindaddr = lclpart == NULL ? NULL : &lcladdr; rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr); if (rv != 0) { + nni_tcp_pipe_fini(pipe); return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { + nni_tcp_pipe_fini(pipe); return (rv); } + *pipep = pipe; return (0); } @@ -582,27 +588,33 @@ nni_tcp_ep_bind(void *arg) static int -nni_tcp_ep_accept(void *arg, void *pipearg) +nni_tcp_ep_accept(void *arg, void **pipep) { nni_tcp_ep *ep = arg; - nni_tcp_pipe *pipe = pipearg; + nni_tcp_pipe *pipe; int rv; + if ((rv = nni_tcp_pipe_init(&pipe)) != 0) { + return (rv); + } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { + nni_tcp_pipe_fini(pipe); return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { + nni_tcp_pipe_fini(pipe); return (rv); } + *pipep = pipe; return (0); } static nni_tran_pipe nni_tcp_pipe_ops = { - .p_init = nni_tcp_pipe_init, .p_fini = nni_tcp_pipe_fini, .p_aio_send = nni_tcp_pipe_aio_send, .p_aio_recv = nni_tcp_pipe_aio_recv, |
