From 374f93a18edca2e0656c337a5b54927169ec31fa Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 29 Mar 2017 13:07:35 -0700 Subject: TCP (POSIX) async send/recv working. Other changes. Transport-level pipe initialization is now sepearate and explicit. The POSIX send/recv logic still uses threads under the hood, but makes use of the AIO framework for send/recv. This is a key stepping stone towards enabling poll() or similar async I/O approaches. --- src/core/aio.h | 4 +- src/core/defs.h | 2 + src/core/endpt.c | 4 +- src/core/pipe.c | 22 ++- src/core/platform.h | 11 +- src/core/transport.h | 29 ++-- src/platform/posix/posix_aio.h | 4 +- src/platform/posix/posix_aiothr.c | 15 ++- src/platform/posix/posix_impl.h | 6 - src/platform/posix/posix_net.c | 72 +++++++--- src/transport/inproc/inproc.c | 162 +++++++++++++--------- src/transport/ipc/ipc.c | 60 ++++----- src/transport/tcp/tcp.c | 276 ++++++++++++++++++++++++++++++-------- 13 files changed, 458 insertions(+), 209 deletions(-) (limited to 'src') diff --git a/src/core/aio.h b/src/core/aio.h index 96b04857..c377ca93 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -15,9 +15,7 @@ #include "core/taskq.h" #include "core/thread.h" -typedef struct nni_aio_ops nni_aio_ops; -typedef struct nni_aio nni_aio; - +typedef struct nni_aio_ops nni_aio_ops; // An nni_aio is an async I/O handle. struct nni_aio { diff --git a/src/core/defs.h b/src/core/defs.h index 82d0dfaf..1c44e9af 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -49,6 +49,8 @@ typedef int nni_signal; // Wakeup channel. typedef uint64_t nni_time; // Abs. time (usec). typedef int64_t nni_duration; // Rel. time (usec). +typedef struct nni_aio nni_aio; + typedef void (*nni_cb)(void *); // Used by transports for scatter gather I/O. diff --git a/src/core/endpt.c b/src/core/endpt.c index 0310783a..1108a504 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -207,7 +207,7 @@ nni_ep_connect(nni_ep *ep, nni_pipe **pp) if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { return (rv); } - rv = ep->ep_ops.ep_connect(ep->ep_data, pipe); + rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data); if (rv != 0) { nni_pipe_destroy(pipe); return (rv); @@ -359,7 +359,7 @@ nni_ep_accept(nni_ep *ep, nni_pipe **pp) if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { return (rv); } - rv = ep->ep_ops.ep_accept(ep->ep_data, pipe); + rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data); if (rv != 0) { nni_pipe_destroy(pipe); return (rv); diff --git a/src/core/pipe.c b/src/core/pipe.c index 85a0bd4a..4b37cfa9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -39,14 +39,14 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp) int nni_pipe_aio_recv(nni_pipe *p, nni_aio *aio) { - return (p->p_tran_ops.pipe_aio_recv(p->p_tran_data, aio)); + return (p->p_tran_ops.p_aio_recv(p->p_tran_data, aio)); } int nni_pipe_aio_send(nni_pipe *p, nni_aio *aio) { - return (p->p_tran_ops.pipe_aio_send(p->p_tran_data, aio)); + return (p->p_tran_ops.p_aio_send(p->p_tran_data, aio)); } @@ -92,7 +92,7 @@ nni_pipe_close(nni_pipe *p) // Close the underlying transport. if (p->p_tran_data != NULL) { - p->p_tran_ops.pipe_close(p->p_tran_data); + p->p_tran_ops.p_close(p->p_tran_data); } // Unregister our ID so nobody else can find it. @@ -116,7 +116,7 @@ nni_pipe_close(nni_pipe *p) uint16_t nni_pipe_peer(nni_pipe *p) { - return (p->p_tran_ops.pipe_peer(p->p_tran_data)); + return (p->p_tran_ops.p_peer(p->p_tran_data)); } @@ -145,7 +145,15 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *tran->tran_pipe; + // Initialize the transport pipe data. + if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) { + nni_mtx_fini(&p->p_mtx); + NNI_FREE_STRUCT(p); + return (rv); + } + if ((rv = nni_sock_pipe_add(sock, p)) != 0) { + p->p_tran_ops.p_fini(p->p_tran_data); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); return (rv); @@ -164,7 +172,7 @@ nni_pipe_destroy(nni_pipe *p) // The caller is responsible for ensuring that the pipe // is not in use by any other consumers. It must not be started if (p->p_tran_data != NULL) { - p->p_tran_ops.pipe_destroy(p->p_tran_data); + p->p_tran_ops.p_fini(p->p_tran_data); } nni_sock_pipe_rem(p->p_sock, p); nni_mtx_fini(&p->p_mtx); @@ -176,10 +184,10 @@ int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) { /* This should only be called with the mutex held... */ - if (p->p_tran_ops.pipe_getopt == NULL) { + if (p->p_tran_ops.p_getopt == NULL) { return (NNG_ENOTSUP); } - return (p->p_tran_ops.pipe_getopt(p->p_tran_data, opt, val, szp)); + return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp)); } diff --git a/src/core/platform.h b/src/core/platform.h index b1e824e2..1644973c 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -174,7 +174,7 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); // nni_plat_tcp_init initializes the socket, for example it can // set underlying file descriptors to -1, etc. -extern int nni_plat_tcp_init(nni_plat_tcpsock *); +extern int nni_plat_tcp_init(nni_plat_tcpsock **); // nni_plat_tcp_fini just closes a TCP socket, and releases any related // resources. @@ -203,14 +203,17 @@ extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *); extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *, const nni_sockaddr *); -// nni_plat_tcp_send sends data to the remote side. The platform is -// responsible for attempting to send all of the data. The iov count -// will never be larger than 4. THe platform may modify the iovs. +// nni_plat_tcp_aio_send sends the data to the remote side asynchronously. +// The data to send is stored in the a_iov field of the aio, and the array +// of iovs will never be larger than 4. The platform may modify the iovs, +// or the iov list. +extern int nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *); extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int); // nni_plat_tcp_recv recvs data into the buffers provided by the // iovs. The implementation does not return until the iovs are completely // full, or an error condition occurs. +extern int nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *); extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int); // nni_plat_ipc_init initializes the socket, for example it can diff --git a/src/core/transport.h b/src/core/transport.h index 65cd775f..ab43e497 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -48,8 +48,10 @@ struct nni_tran_ep { // ep_connect establishes a connection. It can return errors // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, - // NNG_ETIMEDOUT, and NNG_EPROTO. - int (*ep_connect)(void *, nni_pipe *); + // NNG_ETIMEDOUT, and NNG_EPROTO. The first argument is the + // transport specific endpoint, and the second is the transport + // specific pipe structure. + int (*ep_connect)(void *, void *); // ep_bind just does the bind() and listen() work, // reserving the address but not creating any connections. @@ -58,8 +60,10 @@ struct nni_tran_ep { // address, or NNG_EACCESS for permission problems. int (*ep_bind)(void *); - // ep_accept accepts an inbound connection. - int (*ep_accept)(void *, nni_pipe *); + // ep_accept accepts an inbound connection. The first argument + // is the transport-specific endpoint, and the second is the + // transport-specific pipe (which will have already been created.) + int (*ep_accept)(void *, void *); // ep_close stops the endpoint from operating altogether. It does // not affect pipes that have already been created. @@ -77,14 +81,17 @@ struct nni_tran_ep { // back into the socket at this point. (Which is one reason pointers back // to socket or even enclosing pipe state, are not provided.) struct nni_tran_pipe { - // p_destroy destroys the pipe. This should clean up all local + // p_init initializes the pipe structure, allocating the structure. + int (*p_init)(void **); + + // p_fini destroys the pipe. This should clean up all local // resources, including closing files and freeing memory, used by // the pipe. After this call returns, the system will not make // further calls on the same pipe. - void (*pipe_destroy)(void *); + void (*p_fini)(void *); - int (*pipe_aio_send)(void *, nni_aio *); - int (*pipe_aio_recv)(void *, nni_aio *); + int (*p_aio_send)(void *, nni_aio *); + int (*p_aio_recv)(void *, nni_aio *); // p_send sends the message. If the message cannot be received, then // the caller may try again with the same message (or free it). If @@ -104,15 +111,15 @@ struct nni_tran_pipe { // p_close closes the pipe. Further recv or send operations should // return back NNG_ECLOSED. - void (*pipe_close)(void *); + void (*p_close)(void *); // p_peer returns the peer protocol. This may arrive in whatever // transport specific manner is appropriate. - uint16_t (*pipe_peer)(void *); + uint16_t (*p_peer)(void *); // p_getopt gets an pipe (transport-specific) property. These values // may not be changed once the pipe is created. - int (*pipe_getopt)(void *, int, void *, size_t *); + int (*p_getopt)(void *, int, void *, size_t *); }; // These APIs are used by the framework internally, and not for use by diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 08c731bc..797f9e43 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -51,8 +51,8 @@ extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *); // extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); // extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); -extern int nni_posix_aio_read(nni_posix_aioq *, nni_aio *); -extern int nni_posix_aio_write(nni_posix_aioq *, nni_aio *); +extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *); +extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *); // extern int nni_posix_aio_connect(); // extern int nni_posix_aio_accept(); diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c index 013e4599..2d31aede 100644 --- a/src/platform/posix/posix_aiothr.c +++ b/src/platform/posix/posix_aiothr.c @@ -178,16 +178,17 @@ nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *)) } nni_list_remove(&q->aq_aios, aio); - nni_mtx_unlock(&q->aq_lk); + //nni_mtx_unlock(&q->aq_lk); // Call the callback. nni_aio_finish(aio, rv, aio->a_count); } while ((aio = nni_list_first(&q->aq_aios)) != NULL) { - nni_mtx_unlock(&q->aq_lk); + nni_list_remove(&q->aq_aios, aio); + //nni_mtx_unlock(&q->aq_lk); nni_aio_finish(aio, NNG_ECLOSED, aio->a_count); - nni_mtx_lock(&q->aq_lk); + //nni_mtx_lock(&q->aq_lk); } nni_mtx_unlock(&q->aq_lk); @@ -299,16 +300,16 @@ nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio) int -nni_posix_aio_read(nni_posix_aioq *q, nni_aio *aio) +nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio) { - return (nni_posix_aio_submit(q, aio)); + return (nni_posix_aio_submit(&p->ap_readq, aio)); } int -nni_posix_aio_write(nni_posix_aioq *q, nni_aio *aio) +nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio) { - return (nni_posix_aio_submit(q, aio)); + return (nni_posix_aio_submit(&p->ap_writeq, aio)); } diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 56cc5937..5da18323 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -35,12 +35,6 @@ extern int nni_plat_errno(int); #endif -#ifdef PLATFORM_POSIX_NET -struct nni_plat_tcpsock { - int fd; - int devnull; // used for shutting down blocking accept() -}; -#endif #ifdef PLATFORM_POSIX_IPC struct nni_plat_ipcsock { diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 24b45819..c7651655 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -8,6 +8,7 @@ // #include "core/nng_impl.h" +#include "platform/posix/posix_aio.h" #ifdef PLATFORM_POSIX_NET @@ -30,6 +31,14 @@ #define NNI_TCP_SOCKTYPE SOCK_STREAM #endif +#ifdef PLATFORM_POSIX_NET +struct nni_plat_tcpsock { + int fd; + int devnull; // used for shutting down blocking accept() + nni_posix_aio_pipe aiop; +}; +#endif + static int nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) { @@ -163,6 +172,20 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) } +int +nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) +{ + return (nni_posix_aio_write(&s->aiop, aio)); +} + + +int +nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) +{ + return (nni_posix_aio_read(&s->aiop, aio)); +} + + int nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) { @@ -241,32 +264,39 @@ nni_plat_tcp_setopts(int fd) int -nni_plat_tcp_init(nni_plat_tcpsock *s) +nni_plat_tcp_init(nni_plat_tcpsock **tspp) { - s->fd = -1; + nni_plat_tcpsock *tsp; + + if ((tsp = NNI_ALLOC_STRUCT(tsp)) == NULL) { + return (NNG_ENOMEM); + } + tsp->fd = -1; + *tspp = tsp; return (0); } void -nni_plat_tcp_fini(nni_plat_tcpsock *s) +nni_plat_tcp_fini(nni_plat_tcpsock *tsp) { - if (s->fd != -1) { - (void) close(s->fd); - s->fd = -1; + if (tsp->fd != -1) { + (void) close(tsp->fd); + tsp->fd = -1; } + NNI_FREE_STRUCT(tsp); } void -nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp) { - if (s->fd != -1) { - (void) shutdown(s->fd, SHUT_RDWR); + if (tsp->fd != -1) { + (void) shutdown(tsp->fd, SHUT_RDWR); // This causes the equivalent of a close. Hopefully waking // up anything that didn't get the hint with the shutdown. // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); + (void) dup2(nni_plat_devnull, tsp->fd); } } @@ -278,7 +308,7 @@ nni_plat_tcp_shutdown(nni_plat_tcpsock *s) // to keep up, and your clients are going to experience bad things. Normally // the actual backlog should hover near 0 anyway.) int -nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr) { int fd; int len; @@ -310,7 +340,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) return (rv); } - s->fd = fd; + tsp->fd = fd; return (0); } @@ -319,7 +349,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) // bind address is not null, then it will attempt to bind to the local // address specified first. int -nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, +nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr, const nni_sockaddr *bindaddr) { int fd; @@ -358,15 +388,20 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, (void) close(fd); return (rv); } - s->fd = fd; + if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + (void) close(fd); + return (rv); + } + tsp->fd = fd; return (0); } int -nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server) { int fd; + int rv; for (;;) { #ifdef NNG_USE_ACCEPT4 @@ -387,7 +422,12 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) nni_plat_tcp_setopts(fd); - s->fd = fd; + if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + close(fd); + return (rv); + } + + tsp->fd = fd; return (0); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index de9b5e91..c6f908f8 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -38,22 +38,22 @@ struct nni_inproc_pipe { // nni_inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct nni_inproc_pair { - nni_mtx mx; - int refcnt; - nni_msgq * q[2]; - nni_inproc_pipe pipe[2]; - char addr[NNG_MAXADDRLEN+1]; + nni_mtx mx; + int refcnt; + nni_msgq * q[2]; + nni_inproc_pipe * pipes[2]; + char addr[NNG_MAXADDRLEN+1]; }; struct nni_inproc_ep { - char addr[NNG_MAXADDRLEN+1]; - int mode; - int closed; - nni_list_node node; - uint16_t proto; - nni_cv cv; - nni_list clients; - void * cpipe; // connected pipe (DIAL only) + char addr[NNG_MAXADDRLEN+1]; + int mode; + int closed; + nni_list_node node; + uint16_t proto; + nni_cv cv; + nni_list clients; + nni_inproc_pipe * cpipe; // connected pipe (DIAL only) }; #define NNI_INPROC_EP_IDLE 0 @@ -108,23 +108,45 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) } +static int +nni_inproc_pipe_init(void **argp) +{ + nni_inproc_pipe *pipe; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + *argp = pipe; + return (0); +} + + static void -nni_inproc_pipe_destroy(void *arg) +nni_inproc_pipe_fini(void *arg) { nni_inproc_pipe *pipe = arg; - nni_inproc_pair *pair = pipe->pair; + nni_inproc_pair *pair; // We could assert the pipe closed... - // If we are the last peer, then toss the pair structure. - nni_mtx_lock(&pair->mx); - pair->refcnt--; - if (pair->refcnt == 0) { - nni_mtx_unlock(&pair->mx); - nni_inproc_pair_destroy(pair); - } else { - nni_mtx_unlock(&pair->mx); + if ((pair = pipe->pair) != NULL) { + // If we are the last peer, then toss the pair structure. + nni_mtx_lock(&pair->mx); + if (pair->pipes[0] == pipe) { + pair->pipes[0] = NULL; + } else if (pair->pipes[1] == pipe) { + pair->pipes[1] = NULL; + } + pair->refcnt--; + if (pair->refcnt == 0) { + nni_mtx_unlock(&pair->mx); + nni_inproc_pair_destroy(pair); + } else { + nni_mtx_unlock(&pair->mx); + } } + + NNI_FREE_STRUCT(pipe); } @@ -262,49 +284,54 @@ nni_inproc_ep_close(void *arg) static int -nni_inproc_ep_connect(void *arg, nni_pipe *npipe) +nni_inproc_ep_connect(void *arg, void *pipearg) { + nni_inproc_pipe *pipe = pipearg; nni_inproc_ep *ep = arg; + nni_inproc_ep *server; if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } nni_mtx_lock(&nni_inproc.mx); - for (;;) { - nni_inproc_ep *server; - if (ep->closed) { - nni_mtx_unlock(&nni_inproc.mx); - return (NNG_ECLOSED); + // Find a server. + NNI_LIST_FOREACH (&nni_inproc.servers, server) { + if (server->mode != NNI_INPROC_EP_LISTEN) { + continue; } - if (ep->cpipe != NULL) { + if (strcmp(server->addr, ep->addr) == 0) { break; } - // Find a server. - NNI_LIST_FOREACH (&nni_inproc.servers, server) { - if (server->mode != NNI_INPROC_EP_LISTEN) { - continue; - } - if (strcmp(server->addr, ep->addr) == 0) { - break; - } - } - if (server == NULL) { + } + if (server == NULL) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_ECONNREFUSED); + } + + ep->mode = NNI_INPROC_EP_DIAL; + ep->cpipe = pipe; + nni_list_append(&server->clients, ep); + + while (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->closed) { + nni_list_remove(&server->clients, ep); nni_mtx_unlock(&nni_inproc.mx); - return (NNG_ECONNREFUSED); + return (NNG_ECLOSED); } - - ep->mode = NNI_INPROC_EP_DIAL; - nni_list_append(&server->clients, ep); nni_cv_wake(&server->cv); nni_cv_wait(&ep->cv); - if (ep->mode == NNI_INPROC_EP_DIAL) { - ep->mode = NNI_INPROC_EP_IDLE; - nni_list_remove(&server->clients, ep); - } } - nni_pipe_set_tran_data(npipe, ep->cpipe); - ep->cpipe = NULL; + + // If we got here, either we connected successfully, or the far end + // server closed on us. In the former case our cpipe will be NULL, + // having been cleared by the server. In the latter, the cpipe will + // still be set, indicating server shutdown. + if (ep->cpipe != NULL) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_ECONNRESET); + } + nni_mtx_unlock(&nni_inproc.mx); return (0); } @@ -342,9 +369,10 @@ nni_inproc_ep_bind(void *arg) static int -nni_inproc_ep_accept(void *arg, nni_pipe *npipe) +nni_inproc_ep_accept(void *arg, void *pipearg) { nni_inproc_ep *ep = arg; + nni_inproc_pipe *pipe = pipearg; nni_inproc_ep *client; nni_inproc_pair *pair; int rv; @@ -383,17 +411,18 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe) } nni_list_remove(&ep->clients, client); - client->mode = NNI_INPROC_EP_IDLE; + pair->pipes[0] = client->cpipe; + pair->pipes[1] = pipe; (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); - pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; - pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1]; - pair->pipe[0].pair = pair->pipe[1].pair = pair; - pair->pipe[0].addr = pair->pipe[1].addr = pair->addr; - pair->pipe[1].peer = client->proto; - pair->pipe[0].peer = ep->proto; + pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; + pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; + pair->pipes[0]->pair = pair->pipes[1]->pair = pair; + pair->pipes[0]->addr = pair->pipes[1]->addr = pair->addr; + pair->pipes[1]->peer = client->proto; + pair->pipes[0]->peer = ep->proto; pair->refcnt = 2; - client->cpipe = &pair->pipe[0]; - nni_pipe_set_tran_data(npipe, &pair->pipe[1]); + client->mode = NNI_INPROC_EP_IDLE; + client->cpipe = NULL; nni_cv_wake(&client->cv); nni_mtx_unlock(&nni_inproc.mx); @@ -403,12 +432,13 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe) static nni_tran_pipe nni_inproc_pipe_ops = { - .pipe_destroy = nni_inproc_pipe_destroy, - .pipe_aio_send = nni_inproc_pipe_aio_send, - .pipe_aio_recv = nni_inproc_pipe_aio_recv, - .pipe_close = nni_inproc_pipe_close, - .pipe_peer = nni_inproc_pipe_peer, - .pipe_getopt = nni_inproc_pipe_getopt, + .p_init = nni_inproc_pipe_init, + .p_fini = nni_inproc_pipe_fini, + .p_aio_send = nni_inproc_pipe_aio_send, + .p_aio_recv = nni_inproc_pipe_aio_recv, + .p_close = nni_inproc_pipe_close, + .p_peer = nni_inproc_pipe_peer, + .p_getopt = nni_inproc_pipe_getopt, }; static nni_tran_ep nni_inproc_ep_ops = { diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 11965d17..46003487 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -59,8 +59,26 @@ nni_ipc_pipe_close(void *arg) } +static int +nni_ipc_pipe_init(void **argp) +{ + nni_ipc_pipe *pipe; + int rv; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + *argp = pipe; + return (0); +} + + static void -nni_ipc_pipe_destroy(void *arg) +nni_ipc_pipe_fini(void *arg) { nni_ipc_pipe *pipe = arg; @@ -260,10 +278,10 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe) static int -nni_ipc_ep_connect(void *arg, nni_pipe *npipe) +nni_ipc_ep_connect(void *arg, void *pipearg) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe; + nni_ipc_pipe *pipe = pipearg; int rv; const char *path; @@ -272,13 +290,6 @@ nni_ipc_ep_connect(void *arg, nni_pipe *npipe) } path = ep->addr + strlen("ipc://"); - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; @@ -291,11 +302,8 @@ nni_ipc_ep_connect(void *arg, nni_pipe *npipe) if ((rv = nni_ipc_negotiate(pipe)) != 0) { nni_plat_ipc_shutdown(&pipe->fd); - nni_plat_ipc_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); return (rv); } - nni_pipe_set_tran_data(npipe, pipe); return (0); } @@ -321,22 +329,14 @@ nni_ipc_ep_bind(void *arg) static int -nni_ipc_ep_accept(void *arg, nni_pipe *npipe) +nni_ipc_ep_accept(void *arg, void *pipearg) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe; + nni_ipc_pipe *pipe = pipearg; int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) { nni_plat_ipc_fini(&pipe->fd); @@ -345,22 +345,18 @@ nni_ipc_ep_accept(void *arg, nni_pipe *npipe) } if ((rv = nni_ipc_negotiate(pipe)) != 0) { nni_plat_ipc_shutdown(&pipe->fd); - nni_plat_ipc_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); return (rv); } - nni_pipe_set_tran_data(npipe, pipe); return (0); } static nni_tran_pipe nni_ipc_pipe_ops = { - .pipe_destroy = nni_ipc_pipe_destroy, - .pipe_send = nni_ipc_pipe_send, - .pipe_recv = nni_ipc_pipe_recv, - .pipe_close = nni_ipc_pipe_close, - .pipe_peer = nni_ipc_pipe_peer, - .pipe_getopt = nni_ipc_pipe_getopt, + .p_init = nni_ipc_pipe_init, + .p_fini = nni_ipc_pipe_fini, + .p_close = nni_ipc_pipe_close, + .p_peer = nni_ipc_pipe_peer, + .p_getopt = nni_ipc_pipe_getopt, }; static nni_tran_ep nni_ipc_ep_ops = { diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e198a927..f220e49e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -22,21 +22,34 @@ typedef struct nni_tcp_ep nni_tcp_ep; // nni_tcp_pipe is one end of a TCP connection. struct nni_tcp_pipe { const char * addr; - nni_plat_tcpsock fd; + nni_plat_tcpsock * tsp; uint16_t peer; uint16_t proto; size_t rcvmax; + + nni_aio * user_txaio; + nni_aio * user_rxaio; + + uint8_t txlen[sizeof (uint64_t)]; + uint8_t rxlen[sizeof (uint64_t)]; + nni_aio txaio; + nni_aio rxaio; + nni_msg * rxmsg; }; struct nni_tcp_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_tcpsock fd; + nni_plat_tcpsock * tsp; int closed; uint16_t proto; size_t rcvmax; int ipv4only; }; + +static void nni_tcp_pipe_send_cb(void *); +static void nni_tcp_pipe_recv_cb(void *); + static int nni_tcp_tran_init(void) { @@ -55,23 +68,201 @@ nni_tcp_pipe_close(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_shutdown(&pipe->fd); + nni_plat_tcp_shutdown(pipe->tsp); +} + + +static int +nni_tcp_pipe_init(void **argp) +{ + nni_tcp_pipe *pipe; + int rv; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); + if (rv != 0) { + nni_plat_tcp_fini(pipe->tsp); + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe); + if (rv != 0) { + nni_aio_fini(&pipe->txaio); + nni_plat_tcp_fini(pipe->tsp); + NNI_FREE_STRUCT(pipe); + } + *argp = pipe; + return (0); } static void -nni_tcp_pipe_destroy(void *arg) +nni_tcp_pipe_fini(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_fini(&pipe->fd); + nni_aio_fini(&pipe->rxaio); + nni_aio_fini(&pipe->txaio); + nni_plat_tcp_fini(pipe->tsp); NNI_FREE_STRUCT(pipe); } +static void +nni_tcp_pipe_send_cb(void *arg) +{ + nni_tcp_pipe *pipe = arg; + int rv; + nni_aio *aio; + size_t len; + + if ((aio = pipe->user_txaio) == NULL) { + // This should never ever happen. + NNI_ASSERT(aio != NULL); + return; + } + pipe->user_txaio = NULL; + + if ((rv = nni_aio_result(&pipe->txaio)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + len = nni_msg_len(aio->a_msg); + nni_msg_free(aio->a_msg); + aio->a_msg = NULL; + + nni_aio_finish(aio, 0, len); +} + + +static void +nni_tcp_pipe_recv_cb(void *arg) +{ + nni_tcp_pipe *pipe = arg; + nni_aio *aio; + int rv; + + aio = pipe->user_rxaio; + if (aio == NULL) { + // This should never ever happen. + NNI_ASSERT(aio != NULL); + return; + } + + if ((rv = nni_aio_result(&pipe->rxaio)) != 0) { + // Error on receive. This has to cause an error back + // to the user. Also, if we had allocated an rxmsg, lets + // toss it. + if (pipe->rxmsg != NULL) { + nni_msg_free(pipe->rxmsg); + pipe->rxmsg = NULL; + } + pipe->user_rxaio = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + + // If we don't have a message yet, we were reading the TCP message + // header, which is just the length. This tells us the size of the + // message to allocate and how much more to expect. + if (pipe->rxmsg == NULL) { + uint64_t len; + // We should have gotten a message header. + NNI_GET64(pipe->rxlen, len); + + // Make sure the message payload is not too big. If it is + // the caller will shut down the pipe. + if (len > pipe->rcvmax) { + pipe->user_rxaio = NULL; + nni_aio_finish(aio, NNG_EMSGSIZE, 0); + return; + } + + if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + pipe->user_rxaio = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + + // Submit the rest of the data for a read -- we want to + // read the entire message now. + pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); + pipe->rxaio.a_niov = 1; + + rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + if (rv != 0) { + pipe->user_rxaio = NULL; + nni_msg_free(pipe->rxmsg); + pipe->rxmsg = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + return; + } + + // Otherwise we got a message read completely. Let the user know the + // good news. + pipe->user_rxaio = NULL; + aio->a_msg = pipe->rxmsg; + pipe->rxmsg = NULL; + nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); +} + + +static int +nni_tcp_pipe_aio_send(void *arg, nni_aio *aio) +{ + nni_tcp_pipe *pipe = arg; + nni_msg *msg = aio->a_msg; + uint64_t len; + + pipe->user_txaio = aio; + + len = nni_msg_len(msg) + nni_msg_header_len(msg); + NNI_PUT64(pipe->txlen, len); + + pipe->txaio.a_iov[0].iov_buf = pipe->txlen; + pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txlen); + pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); + pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); + pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); + pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); + pipe->txaio.a_niov = 3; + + return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio)); +} + + +static int +nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio) +{ + nni_tcp_pipe *pipe = arg; + + pipe->user_rxaio = aio; + + NNI_ASSERT(pipe->rxmsg == NULL); + + // Schedule a read of the TCP header. + pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen; + pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); + pipe->rxaio.a_niov = 1; + + return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio)); +} + + static int nni_tcp_pipe_send(void *arg, nni_msg *msg) { +#if 0 nni_tcp_pipe *pipe = arg; uint64_t len; uint8_t buf[sizeof (len)]; @@ -88,10 +279,13 @@ nni_tcp_pipe_send(void *arg, nni_msg *msg) len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len; NNI_PUT64(buf, len); - if ((rv = nni_plat_tcp_send(&pipe->fd, iov, 3)) == 0) { + if ((rv = nni_plat_tcp_send(pipe->tsp, iov, 3)) == 0) { nni_msg_free(msg); } return (rv); + +#endif + return (NNG_EINVAL); } @@ -107,7 +301,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp) iov[0].iov_buf = buf; iov[0].iov_len = sizeof (buf); - if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) != 0) { + if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) != 0) { return (rv); } NNI_GET64(buf, len); @@ -122,7 +316,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp) iov[0].iov_len = nng_msg_len(msg); iov[0].iov_buf = nng_msg_body(msg); - if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) { + if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) == 0) { *msgp = msg; } else { nni_msg_free(msg); @@ -181,7 +375,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock) ep->ipv4only = 0; // XXX: FIXME ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) { + if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) { NNI_FREE_STRUCT(ep); return (rv); } @@ -198,7 +392,7 @@ nni_tcp_ep_fini(void *arg) { nni_tcp_ep *ep = arg; - nni_plat_tcp_fini(&ep->fd); + nni_plat_tcp_fini(ep->tsp); NNI_FREE_STRUCT(ep); } @@ -208,7 +402,7 @@ nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; - nni_plat_tcp_shutdown(&ep->fd); + nni_plat_tcp_shutdown(ep->tsp); } @@ -281,13 +475,13 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe) iov.iov_buf = buf; iov.iov_len = 8; - if ((rv = nni_plat_tcp_send(&pipe->fd, &iov, 1)) != 0) { + if ((rv = nni_plat_tcp_send(pipe->tsp, &iov, 1)) != 0) { return (rv); } iov.iov_buf = buf; iov.iov_len = 8; - if ((rv = nni_plat_tcp_recv(&pipe->fd, &iov, 1)) != 0) { + if ((rv = nni_plat_tcp_recv(pipe->tsp, &iov, 1)) != 0) { return (rv); } @@ -303,10 +497,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe) static int -nni_tcp_ep_connect(void *arg, nni_pipe *npipe) +nni_tcp_ep_connect(void *arg, void *pipearg) { nni_tcp_ep *ep = arg; - nni_tcp_pipe *pipe; + nni_tcp_pipe *pipe = pipearg; char *host; uint16_t port; int flag; @@ -350,13 +544,6 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe) return (rv); } - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; @@ -364,20 +551,15 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe) remaddr.s_un.s_in.sa_port = port; bindaddr = lclpart == NULL ? NULL : &lcladdr; - rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr); + rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr); if (rv != 0) { - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { - nni_plat_tcp_shutdown(&pipe->fd); - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + nni_plat_tcp_shutdown(pipe->tsp); return (rv); } - nni_pipe_set_tran_data(npipe, pipe); return (0); } @@ -406,7 +588,7 @@ nni_tcp_ep_bind(void *arg) } baddr.s_un.s_in.sa_port = port; - if ((rv = nni_plat_tcp_listen(&ep->fd, &baddr)) != 0) { + if ((rv = nni_plat_tcp_listen(ep->tsp, &baddr)) != 0) { return (rv); } return (0); @@ -414,46 +596,34 @@ nni_tcp_ep_bind(void *arg) static int -nni_tcp_ep_accept(void *arg, nni_pipe *npipe) +nni_tcp_ep_accept(void *arg, void *pipearg) { nni_tcp_ep *ep = arg; - nni_tcp_pipe *pipe; + nni_tcp_pipe *pipe = pipearg; int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) { - NNI_FREE_STRUCT(pipe); - } - - if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) { - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { - nni_plat_tcp_shutdown(&pipe->fd); - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + nni_plat_tcp_shutdown(pipe->tsp); return (rv); } - nni_pipe_set_tran_data(npipe, pipe); return (0); } static nni_tran_pipe nni_tcp_pipe_ops = { - .pipe_destroy = nni_tcp_pipe_destroy, - .pipe_send = nni_tcp_pipe_send, - .pipe_recv = nni_tcp_pipe_recv, - .pipe_close = nni_tcp_pipe_close, - .pipe_peer = nni_tcp_pipe_peer, - .pipe_getopt = nni_tcp_pipe_getopt, + .p_init = nni_tcp_pipe_init, + .p_fini = nni_tcp_pipe_fini, + .p_aio_send = nni_tcp_pipe_aio_send, + .p_aio_recv = nni_tcp_pipe_aio_recv, + .p_close = nni_tcp_pipe_close, + .p_peer = nni_tcp_pipe_peer, + .p_getopt = nni_tcp_pipe_getopt, }; static nni_tran_ep nni_tcp_ep_ops = { -- cgit v1.2.3-70-g09d2